
%%html
<link href='https://fonts.googleapis.com/css?family=Roboto' rel='stylesheet'>
<style>
body {
font-family: 'Roboto', 'Helvetica', sans-serif;
font-size: 16px;
}
</style>
#Essential Imports
import pandas as pd
import numpy as np
import random
import itertools
import math
import os
import time
#Visualization Libraries
import shapefile
from shapely.geometry import Polygon
from descartes.patch import PolygonPatch
import matplotlib as mpl
import matplotlib.pyplot as plt
plt.style.use('ggplot')
import plotly.express as px
import seaborn as sns
import plotly.graph_objs as go
import plotly.figure_factory as ff
import plotly.offline as pyo
# Set notebook mode to work in offline
pyo.init_notebook_mode()
%matplotlib inline
#Google Big Query
from google.cloud import bigquery
import google.cloud.bigquery.magics
import google.auth
from pandas.io import gbq
import pandas_gbq
#Database libraries
import sqlite3
from sqlalchemy import create_engine
from utils import Clockplot,get_lat_lon,plt_clock,PlotSLD,get_boundaries,Regionmap,Zonemap
#Parallel Computing
import dask.array as da
from dask.distributed import Client
import dask.dataframe as dd
#ML
from pyspark.sql import SparkSession
from pyspark import SparkContext
import pyspark
import pyspark.sql.functions as f
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, CrossValidatorModel
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor, GBTRegressor, RandomForestRegressor
from IPython.display import HTML
HTML('''<script>
function code_toggle() {
if (code_shown){
$('div.input').hide('500');
$('#toggleButton').val('Show Code')
} else {
$('div.input').show('500');
$('#toggleButton').val('Hide Code')
}
code_shown = !code_shown
}
$( document ).ready(function(){
code_shown=false;
$('div.input').hide()
});
</script>
<form action="javascript:code_toggle()">
<input type="submit" id="toggleButton" value="Show Code"></form>''')
A. Introduction
B. Problem Statement
C. Methodology
1. Data Extraction and Storage
2. Data Cleanup and Feature Engineering
4. Exploratory Data Analysis
5. Feature Engineering
6. Modelling
D. Conclusions and Future Work
The high-level workflow for NYC Yellow Taxi Demand prediction and analysis is as follows:
Each step of the workflow will be discussed in detail in the succeeding sections.
#Setting Credentials for accessing google API
google.cloud.bigquery.magics.context.use_bqstorage_api = True
credential_path = r"C:\Users\Dell-pc\Desktop\Thinking Machines\nyctlc\mykey.json"
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = credential_path
credentials, your_project_id = google.auth.default(
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
# Making clients.
client = bigquery.Client.from_service_account_json("mykey.json")
project_id = "thinking-machines-exam-285522"
query = """
SELECT *
FROM `bigquery-public-data.new_york_taxi_trips.tlc_yellow_trips_2017`
"""
df = gbq.read_gbq(query, project_id=project_id, private_key='mykey.json')
df = df.rename(columns={c: c.replace(' ', '_') for c in df.columns})
df['pickup_hour'] = [x[11:13] for x in df['tpep_pickup_datetime']]
df['dropoff_hour'] = [x[11:13] for x in df['tpep_dropoff_datetime']]
df.index += j
df.to_sql('table_record', nyc_database, if_exists='append')
df.to_csv('nyc_database.csv')
#Connecting to Preprocessed Dataset(After performing Data Extraction, Cleaning and Feature Engineering)
nyc_database = sqlite3.connect('nyc_database.db')
sf = shapefile.Reader("taxi_zones.shp")
fields_name = [field[0] for field in sf.fields[1:]]
shp_dic = dict(zip(fields_name, list(range(len(fields_name)))))
attributes = sf.records()
shp_attr = [dict(zip(fields_name, attr)) for attr in attributes]
df_loc = pd.DataFrame(shp_attr).join(get_lat_lon(sf).set_index("LocationID"), on="LocationID")
df_loc.head()
fig, ax = plt.subplots(nrows=1, ncols=2, figsize=(15,8))
ax = plt.subplot(1, 2, 1)
ax.set_title("Boroughs in NYC")
Regionmap(ax, sf)
ax = plt.subplot(1, 2, 2)
ax.set_title("Zones in NYC")
Zonemap(ax, sf)
df_pu = pd.read_sql_query('SELECT PULocationID AS LocationID, count(*) AS PUcount,sum(total_amount) as Amount \
FROM table_record \
GROUP BY PULocationID', nyc_database)
df_do = pd.read_sql_query('SELECT DOLocationID AS LocationID, count(*) AS DOcount, sum(total_amount) as Amount \
FROM table_record \
GROUP BY DOLocationID', nyc_database)
With the selected data, we want to obtain the zones with most pickups and drop-offs.
template = pd.DataFrame([x for x in range(1,max(df_loc['LocationID'].tolist()))], columns=["LocationID"])
df_q1 = pd.concat([df_pu, df_do]).join(template.set_index("LocationID"), how = 'outer', on=["LocationID"]).fillna(0) \
.groupby(["LocationID"], as_index=False) \
.agg({'PUcount': 'sum', 'DOcount': 'sum','Amount': 'sum'})\
.sort_values(by=['LocationID'])
df_q1['TOTALcount'] = df_q1['PUcount'] + df_q1['DOcount']
loc = df_loc[["LocationID", "zone", "borough"]]
df_q1 = df_q1.merge(loc, left_on="LocationID", right_on="LocationID")
df_q1['Amount'] = df_q1['Amount']/1000
PUcount = dict(zip(df_q1['LocationID'].tolist(), df_q1['PUcount'].tolist()))
PUtop3 = df_q1.sort_values(by=['PUcount'], ascending=False).set_index("LocationID").head(3)
DOcount = dict(zip(df_q1['LocationID'].tolist(), df_q1['DOcount'].tolist()))
DOtop3 = df_q1.sort_values(by=['DOcount'], ascending=False).set_index("LocationID").head(3)
PUmoney = dict(zip(df_q1['LocationID'].tolist(), df_q1['Amount'].tolist()))
PUMtop3 = df_q1.sort_values(by=['Amount'], ascending=False).set_index("LocationID").head(3)
PUtop3
DOtop3
PUMtop3
fig, ax = plt.subplots(nrows=1, ncols=2, figsize=(18,8))
ax = plt.subplot(1, 2, 1)
ax.set_title("Zones with most pickups")
Zonemap(ax, sf, heat=PUcount, text=PUtop3.index.tolist())
ax = plt.subplot(1, 2, 2)
ax.set_title("Zones with most drop-offs")
Zonemap(ax, sf, heat=DOcount, text=DOtop3.index.tolist())
plt.rcParams["figure.figsize"] = (10,10)
ax = plt.subplot(1, 1, 1)
ax.set_title("Top 3 Zones with highest revenue")
Zonemap(ax, sf, heat=PUmoney, text=PUMtop3.index.tolist())
It is evident that Yellow Taxis by far earn the most if they pickup their customers from the most popular Airports in NYC. Note that Midtown Center is also among the top 3, which is obvious as Midtown Manhattan is the world largest business district. We can also observe that the high grossing zones are indeed located in Manhattan.
Next, we investigate boroughs with most pickups and drop-offs. In the tables below, we can see that Manhattan is obviously the most popular borough and Staten Island is the least popular borough. Queens and Brooklyn are also popular, although their pickup/droppoff count is less than 10% of Manhattan's.
df_q1_region = df_q1.groupby(["borough"], as_index=False) \
.agg({'PUcount': 'sum', 'DOcount': 'sum', 'TOTALcount': 'sum'}) \
.sort_values(by=['TOTALcount'], ascending=False)
df_q1_region
PUcount = dict(zip(df_q1_region['borough'].tolist(), df_q1_region['PUcount'].tolist()))
DOcount = dict(zip(df_q1_region['borough'].tolist(), df_q1_region['DOcount'].tolist()))
fig, ax = plt.subplots(nrows=1, ncols=2, figsize=(15,8))
ax = plt.subplot(1, 2, 1)
ax.set_title("Boroughs with most pickups")
Regionmap(ax, sf, heat = PUcount)
ax = plt.subplot(1, 2, 2)
ax.set_title("Boroughs with most drop-offs")
Regionmap(ax, sf, heat = DOcount)
In the figure above, it is noticed that in the first half of 2017, there are more pickups in Queens than in Brooklyn while there are similar number of drop-offs in both Queens and Brooklyn. Yet, when it comes to the demand, Manhattan beats all the Boroughs and for the right reasons. There are more outlets, more corporates, highend bars to name a few, which not only makes it more popular, reflected by its exorbitantly high housing price in whole New York city.
Thus if I owe a Yellow Cab Taxi company, I would definitely want to start operating in Manhattan compared to other Boroughs. But since Manhattan is populated and the competetion is high, my second option as a business man should be Queens, followed by Brooklyn
Lets query the database for the exact data that we would need
df_pu = pd.read_sql_query('SELECT pickup_hour AS time, count(*) AS PUcount \
FROM table_record \
GROUP BY pickup_hour', nyc_database)
df_do = pd.read_sql_query('SELECT dropoff_hour AS time, count(*) AS DOcount \
FROM table_record \
GROUP BY dropoff_hour', nyc_database)
df_q2 = df_pu.merge(df_do, on="time")
From the selected data, we arrange and show the visualization of the information we need.
template = pd.DataFrame(["{0:0=2d}".format(x) for x in range(0,24)], columns=["time"])
df_q2 = df_q2.join(template.set_index("time"), how = 'outer', on=["time"]).fillna(0) \
.groupby(["time"], as_index=False) \
.agg({'PUcount': 'sum', 'DOcount': 'sum'}) \
.rename(columns = {'PUcount':'Pick-ups', 'DOcount': 'Drop-offs'}) \
.sort_values(by='time')
fig = go.Figure()
fig.add_trace(go.Scatter(x=df_q2['time'], y=df_q2['Pick-ups'], name='Pick Up',
line=dict(color='firebrick', width=4)))
fig.add_trace(go.Scatter(x=df_q2['time'], y=df_q2['Drop-offs'], name='Drop Offs',
line=dict(color='royalblue', width=4)))
fig.update_layout(title='Pick-Ups vs Drop-Offs',
xaxis_title='Time in Hours(24-Hour clock)',
yaxis_title='Count')
fig.show()
Before we start, we should define what short and long distance trips are at first.
To get a closer look at the distribution of trip distance, we select the trip_distance column values and print out its summary statistics.
df_dist = pd.read_sql_query('SELECT trip_distance FROM table_record', nyc_database)
df_dist['trip_distance'].describe()
The distrubution of trip_distance is extremely right skewed, which is shown in the figure below.
ax = df_dist['trip_distance'].hist(bins=5, figsize=(15,8))
ax.set_yscale('log')
ax.set_xlabel("Distance (Miles)")
ax.set_ylabel("Count")
plt.show()
The fact that it takes about 30 miles(approximate perimeter of New York city) to drive across the whole New York City, I decided to use 30 as the number to split the trips into short or long distance trips.
df_q3_short = pd.read_sql_query('SELECT count(*) AS count FROM table_record \
WHERE trip_distance < 30', nyc_database)
df_q3_long = pd.read_sql_query('SELECT count(*) AS count FROM table_record \
WHERE trip_distance >= 30 ', nyc_database)
print("Short Trips: {} records in total.\nLong Trips: {} records in total."\
.format(df_q3_short.values[0][0], df_q3_long.values[0][0]))
Based on instincts, trips which are longer in nature should have different Pickup/Dropoff time compared to shorter trips. To validate our assumption, we first select temporal information from our database.
df_q3_short = pd.read_sql_query('SELECT pickup_hour AS PUtime, \
dropoff_hour AS DOtime, count(*) AS count \
FROM table_record \
WHERE trip_distance < 30 \
GROUP BY pickup_hour, dropoff_hour', nyc_database)
df_q3_long = pd.read_sql_query('SELECT pickup_hour AS PUtime, \
dropoff_hour AS DOtime, count(*) AS count \
FROM table_record \
WHERE trip_distance >= 30 \
GROUP BY pickup_hour, dropoff_hour', nyc_database)
Lets visualize them
df_q3 = df_q3_short.merge(df_q3_long, on=["PUtime", "DOtime"], suffixes=["_short", "_long"]) \
.rename(columns={"count_short":"short trips", "count_long":"long trips", \
"PUtime":"pickup time", "DOtime":"dropoff time"})
df_q3_PU = df_q3.groupby(["pickup time"], as_index=False) \
.agg({'short trips': 'sum', 'long trips':'sum'}) \
.sort_values(by="pickup time")
df_q3_DO = df_q3.groupby(["dropoff time"], as_index=False) \
.agg({'short trips': 'sum', 'long trips':'sum'}) \
.sort_values(by="dropoff time")
Clockplot()
df_q3_short = pd.read_sql_query('SELECT PULocationID, DOLocationID, count(*) AS count \
FROM table_record \
WHERE trip_distance < 30 \
GROUP BY PULocationID, DOLocationID', nyc_database)
df_q3_long = pd.read_sql_query('SELECT PULocationID, DOLocationID, count(*) AS count \
FROM table_record \
WHERE trip_distance >= 30 \
GROUP BY PULocationID, DOLocationID', nyc_database)
After extracting data from database, we then arrange the information and show the top 3 ('pickup zone', 'dropoff zone') pair for both short trips and long trips.
df_q3 = df_q3_short.merge(df_q3_long, on=["PULocationID", "DOLocationID"], suffixes=["_short", "_long"]) \
.rename(columns={"count_short":"short trips", "count_long":"long trips"})
df_q3 = df_q3.merge(df_loc[["LocationID", "zone"]], left_on="PULocationID", right_on="LocationID") \
.drop(['LocationID'], axis=1).rename(columns={"zone":"pickup zone"}) \
.merge(df_loc[["LocationID", "zone"]], left_on="DOLocationID", right_on="LocationID") \
.drop(['LocationID'], axis=1).rename(columns={"zone":"dropoff zone"})
shortTrip_top3 = df_q3.sort_values(by="short trips", ascending=False).head(3)
shortTrip_top3[['pickup zone', 'dropoff zone', 'short trips']]
longTrip_top3 = df_q3.sort_values(by="long trips", ascending=False).head(3)
longTrip_top3[['pickup zone', 'dropoff zone', 'long trips']]
On the other hand, we can also observe the popular zones for short and long trips on map. By aggregating the pickup/dropoff trip count of each zone, we then show the popular pickup/drop-off zones for short trips and long trips.
df_q3_PU = df_q3.groupby("PULocationID", as_index=False).agg({'short trips':'sum', 'long trips':'sum'})
PUtop3_short = df_q3_PU.sort_values(by=['short trips'], ascending=False).set_index("PULocationID").head(3)
PUtop3_long = df_q3_PU.sort_values(by=['long trips'], ascending=False).set_index("PULocationID").head(3)
PUcount_short = dict(zip(df_q3_PU['PULocationID'].tolist(), df_q3_PU['short trips'].tolist()))
PUcount_long = dict(zip(df_q3_PU['PULocationID'].tolist(), df_q3_PU['long trips'].tolist()))
df_q3_DO = df_q3.groupby("DOLocationID", as_index=False).agg({'short trips':'sum', 'long trips':'sum'})
DOtop3_short = df_q3_DO.sort_values(by=['short trips'], ascending=False).set_index("DOLocationID").head(3)
DOtop3_long = df_q3_DO.sort_values(by=['long trips'], ascending=False).set_index("DOLocationID").head(3)
DOcount_short = dict(zip(df_q3_DO['DOLocationID'].tolist(), df_q3_DO['short trips'].tolist()))
DOcount_long = dict(zip(df_q3_DO['DOLocationID'].tolist(), df_q3_DO['long trips'].tolist()))
fig, ax = plt.subplots(nrows=2, ncols=2, figsize=(18,18))
ax = plt.subplot(2, 2, 1)
ax.set_title("Zones with most pickups for Short Trips")
Zonemap(ax, sf, heat=PUcount_short, text=PUtop3_short.index.tolist())
ax = plt.subplot(2, 2, 2)
ax.set_title("Zones with most pickups for Long Trips")
Zonemap(ax, sf, heat=PUcount_long, text=PUtop3_long.index.tolist())
ax = plt.subplot(2, 2, 3)
ax.set_title("Zones with most drop-offs for Short Trips")
Zonemap(ax, sf, heat=DOcount_short, text=DOtop3_short.index.tolist())
ax = plt.subplot(2, 2, 4)
ax.set_title("Zones with most drop-offs for Long Trips")
Zonemap(ax, sf, heat=DOcount_long, text=DOtop3_long.index.tolist())
Lastly, we compare short trips and long trips in some other aspects.
for attr in ["passenger_count", "RatecodeID", "payment_type"]:
PlotSLD(attr, rpr="proportion", kind='bar')
Take note that for the case of model development, we shall be using data of all the months for year 2017.
SparkContext.setSystemProperty('spark.executor.memory', '8g')
sc = SparkContext("local", "App Name")
spark = SparkSession(sc)
df_yellow_temp = spark.read.csv('nyc_database.csv',header=True)
df_yellow_temp = df_yellow_temp.cache()
df_yellow_temp.rdd.getNumPartitions()
df_yellow_temp = df_yellow_temp.select(df_yellow_temp.passenger_count,
df_yellow_temp.trip_distance,
df_yellow_temp.PULocationID,
df_yellow_temp.DOLocationID,
df_yellow_temp.total_amount,
year(df_yellow_temp.tpep_pickup_datetime).alias('Year'),
month(df_yellow_temp.tpep_pickup_datetime).alias('Month'),
dayofmonth(df_yellow_temp.tpep_pickup_datetime).alias('Day'),
hour(df_yellow_temp.tpep_pickup_datetime).alias('Hour'))
df_yellow_temp.show()
df_yellow_temp = df_yellow_temp.cache()
# null values in each column
data_agg = df_yellow_temp.agg(*[f.count(f.when(f.isnull(c), c)).alias(c) for c in df_yellow_temp.columns])
data_agg.show()
df_pass = df_yellow_temp.withColumn('passenger_count', df_yellow_temp.passenger_count.cast('float')).groupby('Month','Day','Hour','PULocationID').sum('passenger_count')
df_pass = df_pass.cache()
df_amount = df_yellow_temp.withColumn('total_amount', df_yellow_temp.total_amount.cast('float')).groupby('Month','Day','Hour','PULocationID').sum('total_amount')
df_amount = df_amount.cache()
The Dataset has no missing values. After grouping by, we now have two different dataframes:
Note: String Indexer, One Hot Encoder and Vector Assembly operations will be implemented in the Modelling section
In this section, I will train different Machine Learning Regression algorithms to address two challenges, which are, Prediction of Passenger Demand based on Time and Location and Prediction of Gross Revenue based on Time and Location.
The following image depicts the general flow of our pipeline:
As mentioned in the figure, after Feature Engineering, the intuition is to train the data for all the Regressors mentioned, with Train/Test split of 70/30 and Cross Validation folds of 30%. For Hyperparameter Optimization, I will implement Grid Search. The pySpark version of Sci-kit Learn's GridSearchCV is implemented by ParamGrid. For the Evaluation metrics, I will use RMSE and R² Score to compare and contrast the robust models among the four. Due to linear nature of the input and the output variables, R² Score is a valid evaluation metric in this scenario.
The following figure depicts the overall pipeline of the Modelling Section.


Independent Variables / Features / Input :
Dependent Variable / Target / Output :
StringIndexer encodes a string column of labels to a column of label indices. StringIndexer can encode multiple columns.
The input variables or Features or Independent Variables to this model are Hour, Day, Month and Pickup Location whereas the Output or Target Variable or the Dependent Variable is Passenger Count. Since they are all numeric in nature, using String Indexer, they will be converted from String type to Numeric type.
# create object of StringIndexer class and specify input and output column
df_month = StringIndexer(inputCol='Month',outputCol='pass_month')
df_day = StringIndexer(inputCol='Day',outputCol='pass_day')
df_hour = StringIndexer(inputCol='Hour',outputCol='pass_hour')
df_puloc = StringIndexer(inputCol='PULocationID',outputCol='pass_pickup')
df_passenger = StringIndexer(inputCol='sum(passenger_count)',outputCol='passenger_count')
# transform the data
df_pass = df_month.fit(df_pass).transform(df_pass)
df_pass = df_day.fit(df_pass).transform(df_pass)
df_pass = df_hour.fit(df_pass).transform(df_pass)
df_pass = df_puloc.fit(df_pass).transform(df_pass)
df_pass = df_passenger.fit(df_pass).transform(df_pass)
# view the transformed data
df_pass.select('pass_month', 'pass_day', 'pass_hour', 'pass_pickup', 'passenger_count').show(10)
Categorical data often must be encoded when working with machine learning algorithms.
Specifically:
Pickup Location is categorical in nature. Thus OHE will be applied to it
# create object and specify input and output column
OHE = OneHotEncoderEstimator(inputCols=['pass_pickup'],outputCols=['pass_pickup_OHE'])
# transform the data
df_pass = OHE.fit(df_pass).transform(df_pass)
# view and transform the data
df_pass.select('pass_month', 'pass_day', 'pass_hour', 'pass_pickup_OHE', 'passenger_count').show(10)
As mentioned earlier in the Feature Engineering section, this step is crucial when implementing Machine Learning in pySpark
# specify the input and output columns of the vector assembler
assembler = VectorAssembler(inputCols=['pass_month',
'pass_day',
'pass_hour',
'pass_pickup_OHE'],
outputCol='features')
# transform the data
final_data = assembler.transform(df_pass)
# view the transformed vector
final_data = final_data.select(['features', 'passenger_count'])
In the next sub sections, I will be training 4 machine learning algorithms, in my case, Regressors, which are mentioned below:
The implementation of machine learning in pyspark is similar to tensorflow with small modifications. The following picture depicts the flow of code execution:

Linear regression is a basic and commonly used type of predictive analysis. The overall idea of regression is to examine two things:
These regression estimates are used to explain the relationship between one dependent variable and one or more independent variables.
Parameters Tuned:
splits = final_data.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]
# Define last stage for model 1: Linear regression model
start = time.time()
lr = LinearRegression(featuresCol = 'features',
labelCol='passenger_count')
evaluator = RegressionEvaluator(labelCol="passenger_count", predictionCol="prediction", metricName="rmse")
grid = ParamGridBuilder().addGrid(lr.maxIter, [100,200,500]) \
.addGrid(lr.regParam, [0,0.3,0.5]) \
.addGrid(lr.elasticNetParam, [0.5,1]) \
.build()
lr_cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, \
evaluator=evaluator, numFolds=3)
# fit the pipeline for the training data
model = lr_cv.fit(train_df)
# transform the data
train_df = model.transform(train_df)
end = time.time()
hours, rem = divmod(end-start, 3600)
minutes, seconds = divmod(rem, 60)
lr_1_time = minutes
lr_evaluator_train = RegressionEvaluator(predictionCol="prediction", \
labelCol="passenger_count",metricName="r2")
lr_1_r2_train = rf_evaluator_train.evaluate(train_df)*100
lr_evaluator_train = RegressionEvaluator(predictionCol="prediction", \
labelCol="passenger_count",metricName="rmse")
lr_1_rmse_train = lr_evaluator_train.evaluate(train_df)
lr_predictions = model.transform(test_df)
lr_predictions.select("prediction","passenger_count","features").show(5)
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
labelCol="passenger_count",metricName="r2")
lr_1_r2_test = lr_evaluator.evaluate(lr_predictions)*100
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
labelCol="passenger_count",metricName="rmse")
lr_1_rmse_test = lr_evaluator.evaluate(lr_predictions)
bestModel = model.bestModel
print('Best Param (regParam): ', bestModel._java_obj.getRegParam())
print('Best Param (MaxIter): ', bestModel._java_obj.getMaxIter())
print('Best Param (elasticNetParam): ', bestModel._java_obj.getElasticNetParam())
path = '/home/msds2020/jbasak/bdcc/spark-exercises/models/linear_regression/'
model.write().overwrite().save(path)
Decision Trees can be summarized with the below bullet points:
A decision tree is arriving at an estimate by asking a series of questions to the data, each question narrowing our possible values until the model get confident enough to make a single prediction. The order of the question as well as their content are being determined by the model. In addition, the questions asked are all in a True/False form.
This is a little tough to grasp because it is not how humans naturally think, and perhaps the best way to show this difference is to create a real decision tree from.
Parameters Tuned:
splits = final_data.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]
# Define last stage for model 1: Decision Tree Regressor model
start = time.time()
dt = DecisionTreeRegressor(featuresCol ='features', labelCol = 'passenger_count')
dt_evaluator = RegressionEvaluator(labelCol="passenger_count",
predictionCol="prediction",
metricName="rmse")
dt_grid = ParamGridBuilder().addGrid(dt.maxDepth, [5,10,14]).build()
dt_cv = CrossValidator(estimator=dt, estimatorParamMaps=dt_grid, \
evaluator=dt_evaluator, numFolds=3)
# fit the pipeline for the training data
dt_model = dt_cv.fit(train_df)
# transform the data
train_df = dt_model.transform(train_df)
end = time.time()
hours, rem = divmod(end-start, 3600)
minutes, seconds = divmod(rem, 60)
dt_1_time = minutes
dt_evaluator_train = RegressionEvaluator(predictionCol="prediction", \
labelCol="passenger_count",metricName="r2")
dt_1_r2_train = dt_evaluator_train.evaluate(train_df)*100
dt_evaluator_train = RegressionEvaluator(predictionCol="prediction", \
labelCol="passenger_count",metricName="rmse")
dt_1_rmse_train = dt_evaluator_train.evaluate(train_df)
dt_predictions = dt_model.transform(test_df)
dt_evaluator = RegressionEvaluator(predictionCol="prediction", \
labelCol="passenger_count",metricName="r2")
dt_1_r2_test = dt_evaluator.evaluate(dt_predictions)*100
dt_evaluator = RegressionEvaluator(predictionCol="prediction", \
labelCol="passenger_count",metricName="rmse")
dt_1_rmse_test = dt_evaluator.evaluate(dt_predictions)
dt_bestModel = dt_model.bestModel
print('Best Param: ', dt_bestModel._java_obj.getMaxDepth())
path = '/home/msds2020/jbasak/bdcc/spark-exercises/models/decision_tree/'
dt_model.write().overwrite().save(path)
Gradient boosting is a machine learning technique for regression and classification problems, which produces a prediction model in the form of an ensemble of weak prediction models, typically decision trees.
"Boosting" in machine learning is a way of combining multiple simple models into a single composite model. This is also why boosting is known as an additive model, since simple models (also known as weak learners) are added one at a time, while keeping existing trees in the model unchanged. As we combine more and more simple models, the complete final model becomes a stronger predictor. The term "gradient" in "gradient boosting" comes from the fact that the algorithm uses gradient descent to minimize the loss.
Parameters Tuned:
splits = final_data.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]
# Define last stage for model 1: Gradient Boosting Regressor model
start = time.time()
gbt = GBTRegressor(featuresCol = 'features',
labelCol = 'passenger_count')
gbt_evaluator = RegressionEvaluator(labelCol="passenger_count",
predictionCol="prediction",
metricName="rmse")
gbt_grid = ParamGridBuilder().addGrid(gbt.maxIter, [10,20,30]) \
.addGrid(gbt.maxDepth, [5,10,15]) \
.build()
gbt_cv = CrossValidator(estimator=gbt, estimatorParamMaps=gbt_grid, \
evaluator=gbt_evaluator, numFolds=3)
# fit the pipeline for the training data
gbt_model = gbt_cv.fit(train_df)
# transform the data
train_df = gbt_model.transform(train_df)
end = time.time()
hours, rem = divmod(end-start, 3600)
minutes, seconds = divmod(rem, 60)
gbt_1_time = minutes
gbt_evaluator_train = RegressionEvaluator(predictionCol="prediction", \
labelCol="passenger_count",metricName="r2")
gbt_1_r2_train = gbt_evaluator_train.evaluate(train_df)*100
gt_evaluator_train = RegressionEvaluator(predictionCol="prediction", \
labelCol="passenger_count",metricName="rmse")
gbt_1_rmse_train = gbt_evaluator_train.evaluate(train_df)
gbt_predictions = gbt_model.transform(test_df)
gbt_predictions.select("prediction","passenger_count","features").show(5)
gbt_evaluator = RegressionEvaluator(predictionCol="prediction", \
labelCol="passenger_count",metricName="r2")
gbt_1_r2_test = gbt_evaluator.evaluate(gbt_predictions)
gbt_evaluator = RegressionEvaluator(predictionCol="prediction", \
labelCol="passenger_count",metricName="rmse")
gbt_1_rmse_test = gbt_evaluator.evaluate(gbt_predictions)
gbt_bestModel = gbt_model.bestModel
print('Best Param (regParam): ', gbt_bestModel._java_obj.getRegParam())
print('Best Param (MaxIter): ', gbt_bestModel._java_obj.getMaxIter())
path = '/home/msds2020/jbasak/bdcc/spark-exercises/models/gradient_boosting/'
gbt_model.write().overwrite().save(path)
Random forest regression is a Supervised Learning algorithm which uses ensemble learning method for regression. Random forest is a bagging technique and not a boosting technique. The trees in random forests are run in parallel. There is no interaction between these trees while building the trees. It operates by constructing a multitude of decision trees at training time and outputting the mean prediction of the individual trees. A random forest is a meta-estimator (i.e. it combines the result of multiple predictions) which aggregates many decision trees, with some helpful modifications:
Parameters Tuned:
splits = final_data.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]
# Define last stage for model 1: Random Forest model
start = time.time()
rf = RandomForestRegressor(featuresCol = 'features',
labelCol = 'passenger_count')
rf_evaluator = RegressionEvaluator(labelCol="passenger_count",
predictionCol="prediction",
metricName="rmse")
rf_grid = ParamGridBuilder().addGrid(rf.numTrees, [65,80,100,120]) \
.addGrid(rf.maxDepth, [5,10,15]) \
.build()
rf_cv = CrossValidator(estimator=rf, estimatorParamMaps=rf_grid, \
evaluator=rf_evaluator, numFolds=3)
# fit the pipeline for the training data
rf_model = rf_cv.fit(train_df)
# transform the data
train_df = rf_model.transform(train_df)
end = time.time()
hours, rem = divmod(end-start, 3600)
minutes, seconds = divmod(rem, 60)
rf_1_time = minutes
rf_evaluator_train = RegressionEvaluator(predictionCol="prediction", \
labelCol="passenger_count",metricName="r2")
rf_1_r2_train = rf_evaluator_train.evaluate(train_df)*100
rf_evaluator_train = RegressionEvaluator(predictionCol="prediction", \
labelCol="passenger_count",metricName="rmse")
rf_1_rmse_train = rf_evaluator_train.evaluate(train_df)
rf_predictions = rf_model.transform(test_df)
rf_evaluator = RegressionEvaluator(predictionCol="prediction", \
labelCol="passenger_count",metricName="r2")
rf_1_r2_test = rf_evaluator.evaluate(rf_predictions)*100
rf_evaluator = RegressionEvaluator(predictionCol="prediction", \
labelCol="passenger_count",metricName="rmse")
rf_1_rmse_test = rf_evaluator.evaluate(rf_predictions)
rf_bestModel = rf_model.bestModel
print('Best Param (numTrees): ', rf_bestModel._java_obj.numTrees())
print('Best Param (maxDepth): ', rf_bestModel._java_obj.maxDepth())
path = '/home/msds2020/jbasak/bdcc/spark-exercises/models/random_forest/'
rf_model.write().overwrite().save(path)

Refer to previous section for explanation.
The input variables or Features or Independent Variables to this model are Hour, Day, Month and Pickup Location whereas the Output or Target Variable or the Dependent Variable is Total Amount.
# create object of StringIndexer class and specify input and output column
df_month = StringIndexer(inputCol='Month',outputCol='pass_month')
df_day = StringIndexer(inputCol='Day',outputCol='pass_day')
df_hour = StringIndexer(inputCol='Hour',outputCol='pass_hour')
df_puloc = StringIndexer(inputCol='PULocationID',outputCol='pass_pickup')
df_amt = StringIndexer(inputCol='sum(total_amount)',outputCol='total_amount')
# transform the data
df_amount = df_month.fit(df_amount).transform(df_amount)
df_amount = df_day.fit(df_amount).transform(df_amount)
df_amount = df_hour.fit(df_amount).transform(df_amount)
df_amount = df_puloc.fit(df_amount).transform(df_amount)
df_amount = df_amt.fit(df_amount).transform(df_amount)
# view the transformed data
df_amount.select('pass_month', 'pass_day', 'pass_hour', 'pass_pickup', 'total_amount').show(10)
Refer to previous section for explanation.
# create object and specify input and output column
OHE = OneHotEncoderEstimator(inputCols=['pass_pickup'],outputCols=['pass_pickup_OHE'])
# transform the data
df_pass = OHE.fit(df_amount).transform(df_amount)
# view and transform the data
df_pass.select('pass_month', 'pass_day', 'pass_hour', 'pass_pickup_OHE', 'total_amount').show(10)
Refer to previous section for explanation.
# specify the input and output columns of the vector assembler
assembler = VectorAssembler(inputCols=['pass_month',
'pass_day',
'pass_hour',
'pass_pickup_OHE'],
outputCol='features')
# transform the data
final_data_amount = assembler.transform(df_amount)
# view the transformed vector
final_data_amount = final_data_amount.select(['features', 'total_amount'])
splits = final_data.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]
# Define last stage for model 2: Linear regression model
start = time.time()
lr_amt = LinearRegression(featuresCol = 'features',
labelCol='total_amount')
lr_amt_evaluator = RegressionEvaluator(labelCol="total_amount", predictionCol="prediction", metricName="rmse")
lr_amt_grid = ParamGridBuilder().addGrid(lr.maxIter, [100,200,500]) \
.addGrid(lr.regParam, [0,0.3,0.5]) \
.addGrid(lr.elasticNetParam, [0.5,1]) \
.build()
lr_amt_cv = CrossValidator(estimator=lr_amt, estimatorParamMaps=lr_amt_grid, \
evaluator=lr_amt_evaluator, numFolds=3)
# fit the pipeline for the training data
model_amt = lr_amt_cv.fit(train_df)
# transform the data
train_df = model_amt.transform(train_df)
end = time.time()
hours, rem = divmod(end-start, 3600)
minutes, seconds = divmod(rem, 60)
lr_2_time = minutes
lr_amt_evaluator_train = RegressionEvaluator(predictionCol="prediction", \
labelCol="total_amount",metricName="r2")
lr_2_r2_train = lr_amt_evaluator_train.evaluate(train_df)*100
lr_amt_evaluator_train = RegressionEvaluator(predictionCol="prediction", \
labelCol="total_amount",metricName="rmse")
lr_2_rmse_train = lr_amt_evaluator_train.evaluate(train_df)
lr_amt_predictions = model_amt.transform(test_df)
lr_amt_evaluator = RegressionEvaluator(predictionCol="prediction", \
labelCol="total_amount",metricName="r2")
lr_2_r2_test = lr_amt_evaluator.evaluate(lr_amt_predictions)*100
lr_amt_evaluator = RegressionEvaluator(predictionCol="prediction", \
labelCol="total_amount",metricName="rmse")
lr_2_rmse_test = lr_amt_evaluator.evaluate(lr_amt_predictions)
amt_bestModel = model_amt.bestModel
print('Best Param (regParam): ', amt_bestModel._java_obj.getRegParam())
print('Best Param (MaxIter): ', amt_bestModel._java_obj.getMaxIter())
print('Best Param (elasticNetParam): ', amt_bestModel._java_obj.getElasticNetParam())
path = '/home/msds2020/jbasak/bdcc/spark-exercises/models/linear_regression/model_2/'
model.write().overwrite().save(path)
splits = final_data.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]
# Define last stage for model 2: Decision Tree Regressor model
start = time.time()
dt_amt = DecisionTreeRegressor(featuresCol ='features', labelCol = 'total_amount')
dt_amt_evaluator = RegressionEvaluator(labelCol="total_amount",
predictionCol="prediction",
metricName="rmse")
dt_amt_grid = ParamGridBuilder().addGrid(dt_amt.maxDepth, [5,10,14]).build()
dt_amt_cv = CrossValidator(estimator=dt_amt, estimatorParamMaps=dt_amt_grid, \
evaluator=dt_amt_evaluator, numFolds=3)
# fit the pipeline for the training data
dt_amt_model = dt_cv.fit(train_df)
# transform the data
train_df = dt_amt_model.transform(train_df)
end = time.time()
hours, rem = divmod(end-start, 3600)
minutes, seconds = divmod(rem, 60)
dt_2_time = minutes
dt_amt_evaluator_train = RegressionEvaluator(predictionCol="prediction", \
labelCol="total_amount",metricName="r2")
dt_2_r2_train = dt_amt_evaluator_train.evaluate(train_df)*100
dt_amt_evaluator_train = RegressionEvaluator(predictionCol="prediction", \
labelCol="total_amount",metricName="rmse")
dt_2_rmse_train = dt_amt_evaluator_train.evaluate(train_df)
dt_amt_predictions = dt_amt_cv.transform(test_df)
dt_amt_evaluator = RegressionEvaluator(predictionCol="prediction", \
labelCol="total_amount",metricName="r2")
dt_2_r2_test = dt_amt_evaluator.evaluate(dt_amt_predictions)*100
dt_amt_evaluator = RegressionEvaluator(predictionCol="prediction", \
labelCol="passenger_count",metricName="rmse")
dt_2_rmse_test = dt_amt_evaluator.evaluate(dt_amt_predictions)
dt_amt_bestModel = dt_amt_model.bestModel
print('Best Param (maxDepth): ', dt_amt_bestModel._java_obj.getmaxDepth())
path = '/home/msds2020/jbasak/bdcc/spark-exercises/models/decision_tree/model_2/'
dt_model.write().overwrite().save(path)
splits = final_data.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]
# Define last stage for model 2: Gradient Boosting Regressor model
start = time.time()
gbt_amt = GBTRegressor(featuresCol = 'features',
labelCol = 'total_amount',
maxIter=10)
gbt_amt_evaluator = RegressionEvaluator(labelCol="total_amount",
predictionCol="prediction",
metricName="rmse")
gbt_amt_grid = ParamGridBuilder().addGrid(gbt.maxIter, [100,200,500]) \
.addGrid(gbt.maxDepth, [5,10,15]) \
.build()
gbt_amt_cv = CrossValidator(estimator=gbt_amt, estimatorParamMaps=gbt_amt_grid, \
evaluator=gbt_amt_evaluator, numFolds=3)
# fit the pipeline for the training data
gbt_amt_model = gbt_amt_cv.fit(train_df)
# transform the data
train_df = gbt_amt_model.transform(train_df)
end = time.time()
hours, rem = divmod(end-start, 3600)
minutes, seconds = divmod(rem, 60)
gbt_2_time = minutes
gbt_amt_evaluator_train = RegressionEvaluator(predictionCol="prediction", \
labelCol="total_amount",metricName="r2")
gbt_2_r2_train = gbt_amt_evaluator_train.evaluate(train_df)*100
gbt_amt_evaluator_train = RegressionEvaluator(predictionCol="prediction", \
labelCol="total_amount",metricName="rmse")
gbt_2_rmse_train = gbt_amt_evaluator_train.evaluate(train_df)
gbt_amt_predictions = gbt_model.transform(test_df)
gbt_amt_evaluator = RegressionEvaluator(predictionCol="prediction", \
labelCol="total_amount",metricName="r2")
gbt_2_r2_test = gbt_amt_evaluator.evaluate(gbt_amt_predictions)*100
gbt_amt_evaluator = RegressionEvaluator(predictionCol="prediction", \
labelCol="total_amount",metricName="rmse")
gbt_2_rmse_test = gbt_amt_evaluator.evaluate(gbt_amt_predictions)
gbt_amt_bestModel = gbt_amt_model.bestModel
print('Best Param (maxDepth): ', gbt_amt_bestModel._java_obj.getmaxDepth())
print('Best Param (MaxIter): ', gbt_amt_bestModel._java_obj.getMaxIter())
path = '/home/msds2020/jbasak/bdcc/spark-exercises/models/gradient_boosting/model_2/'
gbt_amt_model.write().overwrite().save(path)
splits = final_data.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]
# Define last stage for model 1: logistic regression model
start = time.time()
rf_amt = RandomForestRegressor(featuresCol = 'features',
labelCol = 'total_amount')
rf_amt_evaluator = RegressionEvaluator(labelCol="total_amount",
predictionCol="prediction",
metricName="rmse")
rf_amt_grid = ParamGridBuilder().addGrid(rf_amt.numTrees, [65,80,100,120]) \
.addGrid(rf_amt.maxDepth, [5,10,15]) \
.build()
rf_amt_cv = CrossValidator(estimator=rf_amt, estimatorParamMaps=rf_amt_grid, \
evaluator=rf_amt_evaluator, numFolds=3)
# fit the pipeline for the training data
rf_amt_model = rf_amt_cv.fit(train_df)
# transform the data
train_df = rf_amt_model.transform(train_df)
end = time.time()
hours, rem = divmod(end-start, 3600)
minutes, seconds = divmod(rem, 60)
rf_2_time = minutes
rf_amt_evaluator_train = RegressionEvaluator(predictionCol="prediction", \
labelCol="total_amount",metricName="r2")
rf_2_r2_train = rf_amt_evaluator_train.evaluate(train_df)*100
rf_amt_evaluator_train = RegressionEvaluator(predictionCol="prediction", \
labelCol="total_amount",metricName="rmse")
rf_2_rmse_train = rf_amt_evaluator_train.evaluate(train_df)
rf_amt_predictions = rf_amt_model.transform(test_df)
rf_amt_evaluator = RegressionEvaluator(predictionCol="prediction", \
labelCol="total_amount",metricName="r2")
rf_2_r2_test = rf_amt_evaluator.evaluate(rf_amt_predictions)*100
rf_amt_evaluator = RegressionEvaluator(predictionCol="prediction", \
labelCol="total_amount",metricName="rmse")
rf_2_rmse_test = rf_amt_evaluator.evaluate(rf_amt_predictions)
rf_amt_bestModel = rf_amt_model.bestModel
print('Best Param (numTrees): ', rf_amt_bestModel._java_obj.getnumTrees())
print('Best Param (maxDepth): ', rf_amt_bestModel._java_obj.getmaxDepth())
path = '/home/msds2020/jbasak/bdcc/spark-exercises/models/random_forest/model_2/'
rf_amt_model.write().overwrite().save(path)
# Add table data
table_1 = [['Algorithm', 'R² Score - Train','R² Score - Test',
'RMSE - Train','RMSE - Test','Time Taken<br>(in minutes)'],
['Linear<br>Regression', lr_1_r2_train, lr_1_r2_test, lr_1_rmse_train, lr_1_rmse_test, lr_1_time],
['Decision<br>Treee', dt_1_r2_train, dt_1_r2_test, dt_1_rmse_train, dt_1_rmse_test, dt_1_time],
['Gradient<br>Boosting', gbt_1_r2_train, gbt_1_r2_test, gbt_1_rmse_train, gbt_1_rmse_test, gbt_1_time],
['Random<br>Forest', rf_1_r2_train, rf_1_r2_test, rf_1_rmse_train, rf_1_rmse_test, rf_1_time]]
# Initialize a figure with ff.create_table(table_data)
fig = ff.create_table(table_1, height_constant=600)
fig.update_layout(
width = 985,height = 500,
title_text = 'Model 1: Passenger Count Demand Predition Statistics',
margin = {'t':50, 'b':100},
)
fig.show()
# Add table data
table_2 = [['Algorithm', 'R² Score - Train','R² Score - Test',
'RMSE - Train','RMSE - Test','Time Taken<br>(in minutes)'],
['Linear<br>Regression', lr_2_r2_train, lr_2_r2_test,
lr_2_rmse_train, lr_2_rmse_test, lr_2_time],
['Decision<br>Treee', dt_2_r2_train, dt_2_r2_test,
dt_2_rmse_train, dt_2_rmse_test, dt_2_time],
['Gradient<br>Boosting', gbt_2_r2_train, gbt_2_r2_test,
gbt_2_rmse_train, gbt_2_rmse_test, gbt_2_time],
['Random<br>Forest', rf_2_r2_train, rf_2_r2_test,
rf_2_rmse_train, rf_2_rmse_test, rf_2_time]]
# Initialize a figure with ff.create_table(table_data)
fig = ff.create_table(table_2, height_constant=600)
fig.update_layout(
width = 985,height = 500,
title_text = 'Model 2: Gross Revenue Demand Predition Statistics',
margin = {'t':50, 'b':100},
)
fig.show()
fig = make_subplots(
rows=2, cols=2,
specs=[[{"type": "bar"}, {"type": "bar"}],
[{"type": "bar"}, {"type": "bar"}]],
subplot_titles=("Model 1: R² Score", "Model 1: RMSE",
"Model 2: R² Score", "Model 2: RMSE")
)
x_data = ['Linear<br>Regression',
'Decision<br>Tree',
'Gradient<br>Boosting',
'Random<br>Forest']
fig.add_trace(go.Bar(x=x_data, y=[lr_1_r2_train,
dt_1_r2_train,
gbt_1_r2_train,
rf_1_r2_train],name='Train'),
row=1, col=1)
fig.add_trace(go.Bar(x=x_data, y=[lr_1_r2_test,
dt_1_r2_test,
gbt_1_r2_test,
rf_1_r2_test], name='Test'),
row=1, col=1)
fig.add_trace(go.Bar(x=x_data, y=[lr_1_rmse_train,
dt_1_rmse_train,
gbt_1_rmse_train,
rf_1_rmse_train],name='Train'),
row=1, col=2)
fig.add_trace(go.Bar(x=x_data, y=[lr_1_rmse_test,
dt_1_rmse_test,
gbt_1_rmse_test,
rf_1_rmse_test],name='Test'),
row=1, col=2)
fig.add_trace(go.Bar(x=x_data, y=[lr_2_r2_train,
dt_2_r2_train,
gbt_2_r2_train,
rf_2_r2_train],name='Train'),
row=2, col=1)
fig.add_trace(go.Bar(x=x_data, y=[lr_2_r2_test,
dt_2_r2_test,
gbt_2_r2_test,
rf_2_r2_test],name='Test'),
row=2, col=1)
fig.add_trace(go.Bar(x=x_data, y=[lr_2_rmse_train,
dt_2_rmse_train,
gbt_2_rmse_train,
rf_2_rmse_train],name='Train'),
row=2, col=2)
fig.add_trace(go.Bar(x=x_data, y=[lr_2_rmse_test,
dt_2_rmse_test,
gbt_2_rmse_test,
rf_2_rmse_test],name='Test'),
row=2, col=2)
# Update yaxis properties
fig.update_yaxes(title_text="R² Score x 100", row=1, col=1)
fig.update_yaxes(title_text="Root Mean Square Error", row=1, col=2)
fig.update_yaxes(title_text="R² Score x 100", row=2, col=1)
fig.update_yaxes(title_text="Root Mean Square Error", row=2, col=2)
# Update title and height
fig.update_layout(title_text="Algorithm Comparison: R² Score & RMSE", height=700)
fig.update_layout(height=700, barmode='group')
fig.show()
From the looks of it, it is clearly evident that Gradient Boosting Algorithm has outperformed in both the cases and also in terms of both evaluation metrics(RMSE and R² Score). Gradient boosting algorithm is a highly sophisticated algorithm, powerful enough to deal with all sorts of irregularities of data. The tool is extremely flexible, which allows users to customize a wide range of hyper-parameters while training the mode, and ultimately to reach the optimal solution. However this result comes with a huge cost as we can observe from the table above that Gradient Boosting algorithm took exorbitant number of hours when compared to Decision Tree and Simple Linear Regression. This can be further improved but parallel computing.(Due to limited computational resource, I was unable to minimize the time)
Overall, our models for predicting Taxi Demand(Passenger Count) and Gross Revenue in New York City performed well. The Gradient Boosting regression model performed best, likely due to its unique ability to capture complex feature dependencies. The decision tree regression model performed fairly better than Random Forest and Simple Regression. Our results and error analysis for the most part supported our intuitions about the usefulness of our feature.
We previously discussed about the historic demand of Medallion Taxis based on Time, Pickup Zones, Gross Revenue and types of trips. Knowing that there is a declination of Taxicabs in NYC in recent years due to Ride Hailing apps, the Medallion Taxi car owners and agencies have to rethink of different strategies to maintain their portion of market size. One of the most important aspect to strategy development is to analyze the demand and make data driven decisions in terms of determining where to position taxicabs, studying patterns in ridership, determining when to position the taxi cabs and finally the optimal days of working to maximize the capital.
Being said that, this research has few limitations with can be nullified if following robust areas are explored: